3  Chapter 2: Processing Vector Data with Apache Sedona and Sparklyr in R

Using Vector Data to Obtain Median Household Income in Pickup and Droppoff Neighbourhoods

3.1 Introduction

In this part, we are going to use the saved locations data to add pickup and dropoff neighbourhoods. We shall then use the neighbourhoods to obtain the median incomes of the pickup and dropoff neighbourhoods. Income is a proxy for affluence. Humour me by assuming that there is a relationship between the duration of a taxi trip and the affluence of either or both the pickup or dropoff locations.

We shall introduce two new datasets: NYC NTA boundaries based on the 2020 census, and NTA median income based on the 2012 ACS. The boundaries data is in vector format, while the income data is in CSV format. To do this, we shall use Apache Sedona to merge the boundaries and income data. We will then perform a spatial join on the coordinates with the boundaries, determining the pickup and dropoff neighbourhoods.

By the way, this will be a new file and not the same as the one used for the previous analysis. When working in local mode, I have found that it is more feasible to separate preprocessing into multiple steps. Running too many transformations is likely to result in out-of-memory errors and take a very long time to complete.

3.2 Installing and loading packages

install.packages("apache.sedona")
# Load necessary libraries for Spark, geospatial data, and data manipulation
library(arrow)
library(sparklyr)
library(sf)
library(dplyr)

3.3 Configuring Spark

# Define the Spark directory for temporary files
spark_dir <- file.path(getwd(), "data", "spark")

The configuration will mostly be kept similar to the one in the first file. The main difference is that some of the Delta configurations are explicitly included and not added as a package. This is because Delta and Apache Sedona clash when Delta is installed as a package while creating the Spark context.

You will notice a few differences in how the Spark context is set up this time around.

# Create an empty list for Spark configuration settings
config <- list()

# Set Spark configurations for memory and performance optimisation

# Configure some delta specific options
config$spark.sql.extensions <- "io.delta.sql.DeltaSparkSessionExtension"
config$spark.sql.catalog.spark_catalog <- "org.apache.spark.sql.delta.catalog.DeltaCatalog"

# Use KryoSerializer for better performance
config$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"  

# Set temporary directory for Spark
config$`sparklyr.shell.driver-java-options` <- paste0("-Djava.io.tmpdir=", spark_dir)  

# Use compressed Oops for JVM performance
config$`sparklyr.shell.driver-java-options` <- "-XX:+UseCompressedOops"  

# Allocate 8GB of memory for the Spark driver
config$`sparklyr.shell.driver-memory` <- '10G'  

# Set fraction of heap memory used for Spark storage
config$spark.memory.fraction <- 0.7  

# Set shuffle partitions (local setting based on workload)
config$spark.sql.shuffle.partitions.local <- 24  

# Set extra memory for driver
config$spark.driver.extraJavaOptions <- "-Xmx1G"  

# Enable off-heap memory usage
config$spark.memory.offHeap.enabled <- "true" 

# Set 4GB for off-heap memory
config$spark.memory.offHeap.size <- "2g"  

# Disable shuffle spill to disk
config$spark.sql.shuffle.spill <- "false"  

# Periodic garbage collection interval
config$spark.cleaner.periodicGC.interval <- "60s"  

# Set max partition size for shuffle files
config$spark.sql.files.maxPartitionBytes <- "200m"  

# Enable adaptive query execution
config$spark.sql.adaptive.enabled <- "true" 

3.4 Instantiating spark context

As you can see, when initiating our Spark context, we explicitly include files associated with Delta and Apache Sedona. By doing so, Apache Sedona and Delta do not clash with each other.

# Connect to Spark with the defined configuration and additional packages for geospatial processing
sc <- spark_connect(
  master = "local[*]",
  config = config,
  packages = c(
    "io.delta:delta-spark_2.12:3.3.0",
    "org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.7.0",
    "org.datasyslab:geotools-wrapper:1.7.0-28.5"
  )
)

3.5 Loading Apache Sedona

It is only after initializing our Spark context that we can now load the Apache Sedona library and initialize its context. I know it seems like a lot, and it is, but this is how we can use both Apache Sedona and Delta in R, based on my research.

We are now ready to get started!

library(apache.sedona)
invoke_static(
  sc,
  "org.apache.sedona.spark.SedonaContext",
  "create",
  spark_session(sc),
  "r"
)
# Launch Spark web UI to monitor the Spark session
spark_web(sc)

3.6 Loading datasets

3.6.1 Locations data

We now read our location data, which is thankfully in Delta format.

# Define the folder containing location data (latitude and longitude of yellow cabs)
locations_sdf_parent_folder <- file.path(getwd(), "data", "locations_sdf")
locations_sdf <- spark_read_delta(
  sc, 
  path = locations_sdf_parent_folder, 
  name = "locations_sdf"
  ) %>% 
    sdf_repartition(24)

print(locations_sdf, width=Inf)

The data contains nearly 94 million rows!

# Print the number of rows in the locations SDF (Spark DataFrame)
sdf_nrow(locations_sdf)

3.6.2 Median household income by neighbourhood data

We now load the average household income by neighbourhood in NYC.

# Load income data (household income by NYC neighbourhood)
nyc_nta_hh_income_file_path <- file.path(getwd(), "data", "nyc_nta_med_inc", "nyc_nta_med_inc.csv")
nyc_nta_hh_income <- spark_read_csv(sc, path = nyc_nta_hh_income_file_path, name = "nyc_nta_hh_income")
# Display the income data
print(nyc_nta_hh_income, width = Inf)

3.6.3 NYC neighbourhoods data

We also load the shapefile using Apache Sedona. Note that we point Sedona to the entire folder and not just the specific .shp file, as is the case when reading shapefiles via sf.

# Load the shapefile for NYC neighbourhoods
ny_neighs_pathfile <- file.path(getwd(), "data", "shapefiles", "nynta2020_25a")
ny_neighbourhoods_shp <- spark_read_shapefile(sc, path = ny_neighs_pathfile, name = "ny_neighbourhoods_shp")
# Display a quick summary of the shapefile data
ny_neighbourhoods_shp %>% glimpse()

3.7 Associating neighbourhood with median household income

We now join the income and boundaries data using their common ID.

# Join the neighbourhood shapefile with the income data
ny_neighbourhoods_shp <- ny_neighbourhoods_shp %>%
  left_join(nyc_nta_hh_income, by = c("NTA2020" = "GeoID"))

Now, we need to determine the relevant CRS that our boundaries data uses. If it differs from EPSG:4324, we must convert it so that we can match it with the pickup and dropoff coordinates. I have not found a way to determine the CRS using Apache Sedona, so I use sf for that.

# Read the shapefile as an SF (Simple Features) object for geospatial operations
ny_neighs_sf <- st_read(file.path(ny_neighs_pathfile, "nynta2020.shp"))
st_crs(ny_neighs_sf)

Knowing that it is EPSG:2263, we can now convert it to EPSG:4324.

# Reproject the geometries to a different coordinate reference system (CRS) for consistency
ny_neighbourhoods_shp <- ny_neighbourhoods_shp %>%
  mutate(
    geometry = st_transform(
      geometry,
      "epsg:2263",  # Source CRS
      "epsg:4326",  # Target CRS
      F
    )
  ) %>%
  select(
    -c(
      BoroCode,
      CountyFIPS,
      NTAAbbrev,
      NTAType,
      CDTA2020,
      CDTAName,
      Shape_Leng,
      Shape_Area
    )
  )

3.8 Joining locations data with neighbourhoods data

Because the boundaries data is very small (about 2 MB on disk), we can cache it in memory for faster access. Generally, you are encouraged to cache data that is less than 10 MB. We are also broadcasting the neighbourhoods data to improve performance. Broadcasting means that our data is shared in its entirety with every core (in local mode) or executor (in cluster mode) so it is not shuffled when joining. Even if we did not explicitly broadcast our data, it most likely would have been broadcasted automatically due to Adaptive Query Execution (AQE) since we enabled it at the start using the option config$spark.sql.adaptive.enabled <- "true". AQE finds the optimal way of conducting joins, and since our neighbourhoods data is minuscule, chances are that it would have been broadcasted to prevent unnecessary shuffling.

# Persist the neighbourhood shapefile in memory for faster access
ny_neighbourhoods_shp <- sdf_broadcast(ny_neighbourhoods_shp)
sdf_persist(ny_neighbourhoods_shp, storage.level = "MEMORY_ONLY")

I have found that it is best to use Spatial SQL when conducting spatial joins or any other spatial analysis using Apache Sedona functions. To do this, we first need to register our dataframes as temporary SQL views. This will be our next step.

# Register the dataframes as temporary SQL views for querying
locations_sdf %>% sdf_register("locations_sdf_view")
ny_neighbourhoods_shp %>% sdf_register("ny_neighbourhoods_shp_view")

Upon registration, we can now conduct a spatial join, asking Apache Sedona to find neighbourhoods that contain specific coordinates using the ST_Contains function. You can find documentation on all available Apache Sedona vector functions here.

# Perform a spatial join to associate each location (latitude, longitude) with the corresponding neighbourhood
locations_sdf_updated <- sdf_sql(
  sc,
  "
  SELECT /*+ BROADCAST(b) */ a.*, b.*
  FROM locations_sdf_view a
  LEFT JOIN ny_neighbourhoods_shp_view b
    ON ST_Contains(b.geometry, ST_Point(a.longitude, a.latitude))
  "
)

3.9 Writing data to disk

Before saving, we remove the geometry column, as Delta cannot write geometry columns. Moreover, there is no need to keep it in the data, as we don’t plan on mapping all the data just yet. If you need to write big data geometry files, consider using the GeoParquet format. You can do so using Spark’s spark_write_geoparquet function or the spark_write_source function with the mode set to GeoParquet.

# Remove the geometry column from the final dataset for further analysis
locations_sdf_updated_no_geom <- locations_sdf_updated %>%
  select(-c(geometry))

Our final data is as shown below. Not too bad for the few lines of code written.

# Print the updated data with all relevant fields (no geometry)
withr::with_options(
  list(pillar.sigfig = 11),
  print(locations_sdf_updated_no_geom)
)

We now write our data to Delta format as usual.

# Define the file path for saving the updated dataset
save_locations_sdf_updated_one_filepath <- file.path(getwd(), "data", "locations_sdf_updated_one")

# Save the updated dataset to Delta format
spark_write_delta(
  locations_sdf_updated_no_geom,
  path = save_locations_sdf_updated_one_filepath
)

And disconnect our spark instance.

# Disconnect from the Spark session once done
spark_disconnect(sc)